/*
 * @Author: lokei
 * @Date: 2022-09-21 18:45:37
 * @LastEditors: lokei
 * @LastEditTime: 2023-07-12 20:32:58
 * @Description: 
 */
package cn.lokei.task.config.queue;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import org.springframework.beans.factory.annotation.Autowired;
// import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;

// import cn.lokei.task.config.iot.GatewayConfig;
import cn.lokei.task.entity.redis.RedisStream;
import cn.lokei.task.handler.RedisStreamListenerMessage;
import cn.lokei.task.handler.RedisStreamPendingHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@RequiredArgsConstructor
@Configuration
// @AutoConfigureAfter(GatewayConfig.class)
public class RedisStreamConfig {

    // @Value("${spring.redis-stream.key}")
    private String redisStreamKey = "smt_print";

    // @Value("${spring.redis-stream.group}")
    private String redisStreamGroup = "smt_print_grp";

    @Autowired
    private RedisStreamListenerMessage streamListener;

    @Autowired
    private RedisStream redisStream;

    @Autowired
    private RedisStreamPendingHandler pendingHandler;

    /**
     * 收到消息后不自动确认，需要用户选择合适的时机确认
     * <p>
     * 当某个消息被ACK，PEL列表就会减少
     * 如果忘记确认（ACK），则PEL列表会不断增长占用内存
     * 如果服务器发生意外，重启连接后将再次收到PEL中的消息ID列表
     *
     * @param factory
     * @return
     */
    @Bean
    public Subscription subscription(RedisConnectionFactory factory) {
        checkGroup(redisStreamKey, redisStreamGroup);
        // 创建Stream消息监听容器配置
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                .builder()
                // 设置阻塞时间
                .pollTimeout(Duration.ofSeconds(1))
                // 配置消息类型
                .targetType(String.class)
                .build();
        // 创建Stream消息监听容器
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer = StreamMessageListenerContainer
                .create(factory, options);
        // 设置消费手动提交配置
        Subscription subscription = listenerContainer.receive(
                // 设置消费者分组和名称
                Consumer.from(redisStreamGroup, "consumer-" + redisStreamKey),
                // 设置订阅Stream的key和获取偏移量，以及消费处理类
                StreamOffset.create(redisStreamKey, ReadOffset.lastConsumed()),
                streamListener);

        List<String> key_list = new ArrayList<String>();
        key_list.add("send_message");
        key_list.add("open_cabinet");
        for (String key : key_list) {
            checkGroup(key, key + "_grp");
            subscription = listenerContainer.receive(
                    // 设置消费者分组和名称
                    Consumer.from(key + "_grp", "consumer-" + key),
                    // 设置订阅Stream的key和获取偏移量，以及消费处理类
                    StreamOffset.create(key, ReadOffset.lastConsumed()),
                    streamListener);
        }

        // 监听容器启动
        listenerContainer.start();
        return subscription;
    }

    /**
     * 由于订阅需要先有stream，先做下检查
     */
    private void checkGroup(String key, String group) {
        // 创建需要校验的分组List
        List<String> consumers = new ArrayList<>();
        consumers.add(group);
        StreamInfo.XInfoConsumers infoGroups = null;
        try {
            // 获取Stream的所有组信息
            infoGroups = redisStream.consumers(key, group);
        } catch (RedisSystemException | InvalidDataAccessApiUsageException ex) {
            // log.error("group key not exist or commend error", ex);
            log.error("group key not exist or commend error");
        }

        // 遍历校验分组是否存在
        for (String consumer : consumers) {
            boolean consumerExist = Objects.nonNull(infoGroups);
            // 创建不存在的分组
            if (!consumerExist) {
                redisStream.creartGroup(key, consumer);
            }
        }
    }

    @Bean
    public void handlePending() {
        this.pendingHandler.run();
    }

}
